package cn.mxleader.kafka.mxleaderspringkafkademo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

@Configuration
public class KafkaStreamsHandlerConfiguration {

    @Bean
    CommandLineRunner demo() {
        return args -> {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

            // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
            // Note: To re-run the demo, you need to use the offset reset tool:
            // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            StreamsBuilder builder = new StreamsBuilder();

            KStream<String, String> source = builder.stream("active-session");
            // @TODO 中文编码问题未解决
            KTable<String, Long> counts = source
                    .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                        @Override
                        public Iterable<String> apply(String value) {
                            System.out.println(value);
                            return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                        }
                    })
                    .groupBy(new KeyValueMapper<String, String, String>() {
                        @Override
                        public String apply(String key, String value) {
                            return value;
                        }
                    })
                    .count();

            // need to override value serde to Long type
            counts.toStream().to("sink_topic", Produced.with(Serdes.String(), Serdes.Long()));

            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
            final CountDownLatch latch = new CountDownLatch(1);

            // attach shutdown handler to catch control-c
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });

            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        };
    }

}
